LockSupport的park等待的底层实现

您所在的位置:网站首页 locksupport park源码 LockSupport的park等待的底层实现

LockSupport的park等待的底层实现

2023-08-13 02:00| 来源: 网络整理| 查看: 265

从上一篇文章中的JDK的延迟队列中,最终是通过LockSupport.park实现线程的等待,那么底层是如何实现等待和超时等待的

LockSupport的park和unpark的方法 public static void park() { UNSAFE.park(false, 0L); } public static void parkNanos(long nanos) { if (nanos > 0) UNSAFE.park(false, nanos); } public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); } 复制代码

从上面可以看到实际LockSupport.park是通过Unsafe的的park方法实现,从下面的方法可以看出这个是一个native方法.

/** * Blocks current thread, returning when a balancing * {@code unpark} occurs, or a balancing {@code unpark} has * already occurred, or the thread is interrupted, or, if not * absolute and time is not zero, the given time nanoseconds have * elapsed, or if absolute, the given deadline in milliseconds * since Epoch has passed, or spuriously (i.e., returning for no * "reason"). Note: This operation is in the Unsafe class only * because {@code unpark} is, so it would be strange to place it * elsewhere. */ public native void park(boolean isAbsolute, long time); 复制代码 JVM的Unsafe的park方法

从下面JDK中代码中可以thread的Parker的对象的park方法进行一段时间的等待。

UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) { HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time); EventThreadPark event; JavaThreadParkedState jtps(thread, time != 0); thread->parker()->park(isAbsolute != 0, time); if (event.should_commit()) { const oop obj = thread->current_park_blocker(); if (time == 0) { post_thread_park_event(&event, obj, min_jlong, min_jlong); } else { if (isAbsolute != 0) { post_thread_park_event(&event, obj, min_jlong, time); } else { post_thread_park_event(&event, obj, time, min_jlong); } } } HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker()); } UNSAFE_END 复制代码

Thread.hpp的文件中内部定义的Park对象

private: Parker _parker; public: Parker* parker() { return &_parker; } 复制代码

下面是Os_posix.cpp中是Linux中实现的Park的park的实现方式

首先将_counter的变量通过CAS设置为0,返回就旧的值,如果之前是大于0,则说明是允许访问,不用阻塞,直接返回。 获取当前线程。 判断线程是否是中断中,如果是,则直接返回,(也就是说线程处于中断状态下会忽略park,不会阻塞等待) 判断如果传入的time参数小于0 或者 是绝对时间并且time是0,则直接返回,(上面的Unsafe调用park传入的参数是 false、0,所以不满足这种情况) 如果time大于0,则转换成绝对时间。 创建ThreadBlockInVM对象,并且调用pthread_mutex_trylock获取线程互斥锁,如果没有获取到锁,则直接返回, 判断_counter变量是否大于0,如果是,则重置_counter为0,释放线程锁,直接返回。 调用 OrderAccess::fence(); 加入内存屏障,禁止指令重排序,确保加锁和释放锁的指令的顺序。 创建OSThreadWaitState对象, 判断time是否大于0,如果是0,则调用pthread_cond_wait进行等待,如果不是0,然后调用pthread_cond_timedwait进行时间参数为absTime的等待, 调用pthread_mutex_unlock进行释放_mutex锁, 再次调用OrderAccess::fence()禁止指令重排序。 // Parker::park decrements count if > 0, else does a condvar wait. Unpark // sets count to 1 and signals condvar. Only one thread ever waits // on the condvar. Contention seen when trying to park implies that someone // is unparking you, so don't wait. And spurious returns are fine, so there // is no need to track notifications. void Parker::park(bool isAbsolute, jlong time) { // Optional fast-path check: // Return immediately if a permit is available. // We depend on Atomic::xchg() having full barrier semantics // since we are doing a lock-free update to _counter. if (Atomic::xchg(&_counter, 0) > 0) return; JavaThread *jt = JavaThread::current(); // Optional optimization -- avoid state transitions if there's // an interrupt pending. if (jt->is_interrupted(false)) { return; } // Next, demultiplex/decode time arguments struct timespec absTime; if (time < 0 || (isAbsolute && time == 0)) { // don't wait at all return; } if (time > 0) { to_abstime(&absTime, time, isAbsolute, false); } // Enter safepoint region // Beware of deadlocks such as 6317397. // The per-thread Parker:: mutex is a classic leaf-lock. // In particular a thread must never block on the Threads_lock while // holding the Parker:: mutex. If safepoints are pending both the // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock. ThreadBlockInVM tbivm(jt); // Can't access interrupt state now that we are _thread_blocked. If we've // been interrupted since we checked above then _counter will be > 0. // Don't wait if cannot get lock since interference arises from // unparking. if (pthread_mutex_trylock(_mutex) != 0) { return; } int status; if (_counter > 0) { // no wait needed _counter = 0; status = pthread_mutex_unlock(_mutex); assert_status(status == 0, status, "invariant"); // Paranoia to ensure our locked and lock-free paths interact // correctly with each other and Java-level accesses. OrderAccess::fence(); return; } OSThreadWaitState osts(jt->osthread(), false /* not Object.wait() */); assert(_cur_index == -1, "invariant"); if (time == 0) { _cur_index = REL_INDEX; // arbitrary choice when not timed status = pthread_cond_wait(&_cond[_cur_index], _mutex); assert_status(status == 0 MACOS_ONLY(|| status == ETIMEDOUT), status, "cond_wait"); }else { _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX; status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime); assert_status(status == 0 || status == ETIMEDOUT, status, "cond_timedwait"); } _cur_index = -1; _counter = 0; status = pthread_mutex_unlock(_mutex); assert_status(status == 0, status, "invariant"); // Paranoia to ensure our locked and lock-free paths interact // correctly with each other and Java-level accesses. OrderAccess::fence(); 复制代码 Linux操作系统是如何实现pthread_cond_timedwait进行时间等待的

pthread_cond_timedwait函数位于glibc中pthread_cond_wait.c, 可以看到是调用 __pthread_cond_wait_common实现

/* See __pthread_cond_wait_common. */ int ___pthread_cond_timedwait64 (pthread_cond_t *cond, pthread_mutex_t *mutex, const struct __timespec64 *abstime) { /* Check parameter validity. This should also tell the compiler that it can assume that abstime is not NULL. */ if (! valid_nanoseconds (abstime->tv_nsec)) return EINVAL; /* Relaxed MO is suffice because clock ID bit is only modified in condition creation. */ unsigned int flags = atomic_load_relaxed (&cond->__data.__wrefs); clockid_t clockid = (flags & __PTHREAD_COND_CLOCK_MONOTONIC_MASK) ? CLOCK_MONOTONIC : CLOCK_REALTIME; return __pthread_cond_wait_common (cond, mutex, clockid, abstime); } 复制代码

下面__pthread_cond_wait_common是实现通过__futex_abstimed_wait_cancelable64实现时间等待

static __always_inline int __pthread_cond_wait_common (pthread_cond_t *cond, pthread_mutex_t *mutex, clockid_t clockid, const struct __timespec64 *abstime) { ''省略''` err = __futex_abstimed_wait_cancelable64 ( cond->__data.__g_signals + g, 0, clockid, abstime, private); ''省略''` } 复制代码

__futex_abstimed_wait_cancelable64是调用__futex_abstimed_wait_common

int __futex_abstimed_wait_cancelable64 (unsigned int* futex_word, unsigned int expected, clockid_t clockid, const struct __timespec64* abstime, int private) { return __futex_abstimed_wait_common (futex_word, expected, clockid, abstime, private, true); } 复制代码

__futex_abstimed_wait_common下面则是通过判断平台是64位或者32位,调用__futex_abstimed_wait_common64或者__futex_abstimed_wait_common32

static int __futex_abstimed_wait_common (unsigned int* futex_word, unsigned int expected, clockid_t clockid, const struct __timespec64* abstime, int private, bool cancel) { int err; unsigned int clockbit; /* Work around the fact that the kernel rejects negative timeout values despite them being valid. */ if (__glibc_unlikely ((abstime != NULL) && (abstime->tv_sec < 0))) return ETIMEDOUT; if (! lll_futex_supported_clockid (clockid)) return EINVAL; clockbit = (clockid == CLOCK_REALTIME) ? FUTEX_CLOCK_REALTIME : 0; int op = __lll_private_flag (FUTEX_WAIT_BITSET | clockbit, private); #ifdef __ASSUME_TIME64_SYSCALLS err = __futex_abstimed_wait_common64 (futex_word, expected, op, abstime, private, cancel); #else bool need_time64 = abstime != NULL && !in_time_t_range (abstime->tv_sec); if (need_time64) { err = __futex_abstimed_wait_common64 (futex_word, expected, op, abstime, private, cancel); if (err == -ENOSYS) err = -EOVERFLOW; } else err = __futex_abstimed_wait_common32 (futex_word, expected, op, abstime, private, cancel); #endif switch (err) { case 0: case -EAGAIN: case -EINTR: case -ETIMEDOUT: case -EINVAL: case -EOVERFLOW: /* Passed absolute timeout uses 64 bit time_t type, but underlying kernel does not support 64 bit time_t futex syscalls. */ return -err; case -EFAULT: /* Must have been caused by a glibc or application bug. */ case -ENOSYS: /* Must have been caused by a glibc bug. */ /* No other errors are documented at this time. */ default: futex_fatal_error (); } } 复制代码

__futex_abstimed_wait_common64是调用INTERNAL_SYSCALL_CANCEL宏定义实现

static int __futex_abstimed_wait_common64 (unsigned int* futex_word, unsigned int expected, int op, const struct __timespec64* abstime, int private, bool cancel) { if (cancel) return INTERNAL_SYSCALL_CANCEL (futex_time64, futex_word, op, expected, abstime, NULL /* Unused. */, FUTEX_BITSET_MATCH_ANY); else return INTERNAL_SYSCALL_CALL (futex_time64, futex_word, op, expected, abstime, NULL /* Ununsed. */, FUTEX_BITSET_MATCH_ANY); } 复制代码

系统调用的的宏定义

/* Issue a syscall defined by syscall number plus any other argument required. Any error will be returned unmodified (including errno). */ #define INTERNAL_SYSCALL_CANCEL(...) \ ({ \ long int sc_ret; \ if (NO_SYSCALL_CANCEL_CHECKING) \ sc_ret = INTERNAL_SYSCALL_CALL (__VA_ARGS__); \ else \ { \ int sc_cancel_oldtype = LIBC_CANCEL_ASYNC (); \ sc_ret = INTERNAL_SYSCALL_CALL (__VA_ARGS__); \ LIBC_CANCEL_RESET (sc_cancel_oldtype); \ } \ sc_ret; \ }) 复制代码

总结 主要对LockSupport的park等待实现的底层实现的浅析,针对于Linux的系统调用还没有找到源码,后续会继续跟踪,希望有读者知道的满帆可以告知下,谢谢。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3